HiveMQ MQTT Client是一個 Java 客戶端庫,用於在 Java 應用程序中實現 MQTT 客戶端。它是由 HiveMQ 團隊開發的庫,幫助開發者輕鬆地集成 MQTT 通信功能到他們的 Java 應用程序中。 HiveMQ MQTT Client 可以用於創建 MQTT 客戶端,以連接到 MQTT broker並執行發布、訂閱等操作。
MQTT 是一種網絡協議,用於在客戶端和服務器之間進行消息傳遞。它是一種輕量級、發布/訂閱型協議,特別適用於物聯網(IoT)設備之間的通信。
MQTT broker 是充當消息的中間服務器,負責接收、轉發和分發 MQTT 消息。
<dependency>
    <groupId>com.hivemq</groupId>
    <artifactId>hivemq-mqtt-client</artifactId>
    <version>1.3.0</version>
</dependency>
//創建MQTT5Client
Mqtt5Client client = Mqtt5Client.builder()
        .identifier(clientId)
        .serverHost(broker)
        .serverPort(1883)
        .build();
當執行某個阻塞操作時,程式會等待,直到該操作完成並返回結果,然後才能繼續執行其他任務。
Mqtt5BlockingClient blockingClient = Mqtt5Client.builder().buildBlocking();
在非同步編程中,任務可以提交給執行緒,然後繼續執行其他任務,當任務完成時,可以通過回調或其他機制來處理結果。
Mqtt5AsyncClient asyncClient = Mqtt5Client.builder().buildAsync();
它建立在非同步編程的基礎上,強調事件的流動和數據的響應。在反應式編程中,系統可以對數據流中的事件做出響應,執行相應的操作。
Mqtt5RxClient rxClient = Mqtt5Client.builder().buildRx();
MqttConfig.java
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqttConfig {
    private final String topic = "test/"; // 訂閱的 MQTT 主題
    // 客戶端識別ID
    String clientId = "clientA";
    // MQTT 伺服器位址
    String broker = "broker.hivemq.com";
    @Bean
    public void connectMqttClient() {
        //創建MQTT5Client
        Mqtt5AsyncClient client = Mqtt5Client.builder()
                .identifier(clientId)
                .serverHost(broker)
                .serverPort(1883)
                .buildAsync();
        //連接
        client.connect()
                .whenComplete((connAck, throwable) -> {
                    if (throwable != null) {
                        //發生錯誤時
                        log.error(throwable.getMessage());
                    } else {
                        //成功連接時
                        log.error("連接成功");
                        //主動訂閱主題
                        client.subscribeWith()
                                .topicFilter(topic)
                                .callback(this::onMessageReceived)  //將發送到主題中的訊息做處理
                                .send()
                                .whenComplete((subAck, throwable_sub) -> {
                                    if (throwable_sub != null) {
                                        log.error(throwable_sub.getMessage());
                                    } else {
                                        log.error("訂閱 " + topic + "主題 成功");
                                    }
                                });
                    }
        });
    }
    //處理傳來的訊息
    private void onMessageReceived(Mqtt5Publish publish) {
        String topic = publish.getTopic().toString();
        String payload = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
        log.error("Received message on topic '" + topic + "': " + payload);
    }
}